package com.atguigu.flink.timer;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/15

 在指定的时间，做一件事。
    在flink中时间有两种语义。
        processingTime:  看计算机的物理时钟
        eventTime:  看水印

 当WaterSensor的水位超过30，就制定一个10s后触发的定时器，进行预警。
 当水位降低到30及以下时，删除之前制定的定时器，取消预警。

 只有KeyedStream才能使用定时器。定时器需要和key绑定！每种key都有自己的定时器，互不干扰。
 */
public class Demo1_ProcessingTimeTimer
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                    .keyBy(WaterSensor::getId)
                    .process(new KeyedProcessFunction<String, WaterSensor, String>()
                    {
                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                            String key = ctx.getCurrentKey();
                            TimerService timerService = ctx.timerService();
                            //定时间
                            if (value.getVc() > 30){
                                time = timerService.currentProcessingTime() + 10000;
                                timerService.registerProcessingTimeTimer(time);
                                System.out.println(key +"指定了以下时间的定时器:"+time);
                            }else {
                                timerService.deleteProcessingTimeTimer(time);
                                System.out.println(key +"删除了以下时间的定时器:"+time);
                            }
                        }

                        //定时器到点执行的程序
                        @Override
                        public void onTimer(long timestamp,OnTimerContext ctx, Collector<String> out) throws Exception {
                            out.collect(ctx.getCurrentKey() +" 到了"+timestamp+"点,预警......");
                        }

                        long time ;

                    })
                    .print();


                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
